{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Moving Data Around\n",
    "* When you need to process a lot of data, a big part of the execution time of your program is devoted to moving the data between storage units."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "* This notebook is **NOT** intended to be run on your personal computer. It is intended to show you the main steps needed when processing a large file on a multi-computer cluster. The main emphasis here is on the **Wall time** required for different operations."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Some terminology"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Data Serialization.\n",
    "* Data in memory is usually stored in **data structures** that allow for fast manipulation. This often means that the amount of memory needed is significantly larger than the amount that would be needed to store the same data on disk.\n",
    "* We say that the data on disk is **serial** and the data stored in data structures is **deserialized**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### AWS-EMR\n",
    "We demonstrate the movement of data on \"Amazon Web Services\" (AWS) \"Elastic Map Reduce\" (EMR).\n",
    "\n",
    "Recall the slide about data organization in the video \"a short history of affordable massive computing\"  In the next figure we add to that slide the way it fits within AWS-EMR."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "#### Three file systems: \n",
    "* **S3:** long term persistent memory. \n",
    "* **Head Node:** standard Unix file system. \n",
    "* **HDFS:** distributed file system on the workers."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "<img alt=\"\" src=\"Figures/AWS-EMR-S3.png\" style=\"height:455px;width:800px\" />"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Reading a CSV file from S3\n",
    "\n",
    "We start with a CSV file on S3, which we move through the head node to HDFS and than parse into a spark RDD."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Moving a file from S3 to the head node"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data\n",
      "kmeans_data.txt  OldData  people.json  Weather\r\n"
     ]
    }
   ],
   "source": [
    "<div class=\"mark\">\n",
    "%cd /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/\n",
    "!ls</div><i class=\"fa fa-lightbulb-o \"></i>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "mkdir: cannot create directory ‘Weather’: File exists\n",
      "/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/Weather\n"
     ]
    }
   ],
   "source": [
    "#create directory to hold data one node\n",
    "!mkdir Weather\n",
    "%cd Weather/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2016-02-09 04:09:49 1511481989 ALL.csv.gz\r\n"
     ]
    }
   ],
   "source": [
    "#list files on S3\n",
    "!aws s3 ls s3://dse-weather/ALL.csv.gz\n",
    "#compressed file is about 1.5GB"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "download: s3://dse-weather/ALL.csv.gz to ./ALL.csv.gz              \n",
      "CPU times: user 956 ms, sys: 280 ms, total: 1.24 s\n",
      "Wall time: 11.9 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#copy file from S3\n",
    "!aws s3 cp s3://dse-weather/ALL.csv.gz ./ALL.csv.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 720 ms, sys: 260 ms, total: 980 ms\n",
      "Wall time: 54.5 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#unompress file\n",
    "!rm ALL.csv\n",
    "!gunzip ALL.csv.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-rw-rw-r-- 1 hadoop hadoop 7668890105 Feb  9  2016 ALL.csv\r\n"
     ]
    }
   ],
   "source": [
    "!ls -l ALL.csv\n",
    "# About 7.7 GB"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "station,year,measurement,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,361,362,363,364,365\r\n",
      "ASN00054128,DAPR,1969,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,\r\n"
     ]
    }
   ],
   "source": [
    "!head -2 ALL.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Distribute file into HDFS\n",
    "copy file from the head-node file system to HDFS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/bin/sh: hadoop: command not found\n",
      "CPU times: user 2.72 ms, sys: 7.76 ms, total: 10.5 ms\n",
      "Wall time: 125 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "!hadoop fs -mkdir /weather"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "copyFromLocal: `/weather/weather.csv': File exists\n",
      "CPU times: user 36 ms, sys: 12 ms, total: 48 ms\n",
      "Wall time: 2.11 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#create a data directory on hdfs\n",
    "!hadoop fs -copyFromLocal ALL.csv hdfs:///weather/weather.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/bin/sh: hadoop: command not found\r\n"
     ]
    }
   ],
   "source": [
    "!hadoop fs -ls /weather"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Read csv file into an RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%cd /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/ \n",
    "!ls lib"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-rw-rw-r-- 1 hadoop hadoop 1085 Mar  1 20:32 lib/numpy_pack.py\r\n"
     ]
    }
   ],
   "source": [
    "%pwd\n",
    "!ls -l lib/numpy_pack.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 172 ms, sys: 16 ms, total: 188 ms\n",
      "Wall time: 16.6 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "%pwd\n",
    "from pyspark import SparkContext\n",
    "sc = SparkContext(pyFiles=['/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/lib/numpy_pack.py'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
      "Wall time: 367 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "RDD=sc.textFile('/weather/weather.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-rw-rw-r-- 1 hadoop hadoop 7668890105 Feb  9  2016 /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/Weather/ALL.csv\r\n"
     ]
    }
   ],
   "source": [
    "fs_file=\"/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/Weather/ALL.csv\"\n",
    "!ls -l $fs_file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "9358395\n",
      "CPU times: user 2.21 s, sys: 5.12 s, total: 7.33 s\n",
      "Wall time: 7.32 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "with open(fs_file,'r') as f:\n",
    "    text=f.readlines()\n",
    "print(len(text))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Code for packing and unpacking byte arrays"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'Code for packing and unpacking a numpy array into a byte array.\\n   the array is flattened if it is not 1D.\\n   This is intended to be used as the interface for storing \\n   \\n   This code is intended to be used to store numpy array as fields in a dataframe and then store the \\n   dataframes in a parquet file.\\n'"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import numpy as np\n",
    "\"\"\"Code for packing and unpacking a numpy array into a byte array.\n",
    "   the array is flattened if it is not 1D.\n",
    "   This is intended to be used as the interface for storing \n",
    "   \n",
    "   This code is intended to be used to store numpy array as fields in a dataframe and then store the \n",
    "   dataframes in a parquet file.\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def packArray(a):\n",
    "    \"\"\"\n",
    "    pack a numpy array into a bytearray that can be stored as a single \n",
    "    field in a spark DataFrame\n",
    "\n",
    "    :param a: a numpy ndarray \n",
    "    :returns: a bytearray\n",
    "    :rtype:\n",
    "\n",
    "    \"\"\"\n",
    "    if type(a)!=np.ndarray:\n",
    "        raise Exception(\"input to packArray should be numpy.ndarray. It is instead \"+str(type(a)))\n",
    "    return bytearray(a.tobytes())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def unpackArray(x,data_type=np.float16):\n",
    "    \"\"\"\n",
    "    unpack a bytearray into a numpy.ndarray\n",
    "\n",
    "    :param x: a bytearray\n",
    "    :param data_type: The dtype of the array. This is important because if determines how many bytes go into each entry in the array.\n",
    "    :returns: a numpy array\n",
    "    :rtype: a numpy ndarray of dtype data_type.\n",
    "\n",
    "    \"\"\"\n",
    "    return np.frombuffer(x,dtype=data_type)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### range values\n",
    "Using code that was removed we find that the range of values is \n",
    "\n",
    "`-1000.0, 97892.0` \n",
    "\n",
    "which means that as ints we will need 32 but, but with float we can use just 16."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#main parsing code\n",
    "\n",
    "import numpy as np\n",
    "def parse_weather(line):\n",
    "    L=line.split(',')\n",
    "    try:\n",
    "        assert len(L)==368\n",
    "        i=2\n",
    "        L[i]=int(L[i])\n",
    "        for i in range(3,368):\n",
    "            if L[i]!='':\n",
    "                L[i]=np.float16(L[i])\n",
    "            else:\n",
    "                L[i]=np.nan\n",
    "    except:\n",
    "        #if error in parsing, return (1, input line)\n",
    "        return (1,line)\n",
    "    Out=L[:3]\n",
    "    Out.append(packArray(np.array(L[3:],dtype=np.float16)))\n",
    "    # if parsing OK, return (0, parsed data)\n",
    "    return (0,Out)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#this cell demonstrates how to test the parse_weather function on an individual row.\n",
    "Debug=False\n",
    "if Debug:\n",
    "    lines=RDD.take(10)\n",
    "    GG=parse_weather(lines[-2])\n",
    "    GG"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 8 ms, sys: 0 ns, total: 8 ms\n",
      "Wall time: 26.3 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "Parsed=RDD.map(parse_weather).cache() # filter out bad rows which are mapped (1,line)\n",
    "DATA=Parsed.filter(lambda x:x[0]==0).map(lambda x:x[1])\n",
    "ERRORS=Parsed.filter(lambda x:x[0]==1).map(lambda x:x[1])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(58) PythonRDD[3] at RDD at PythonRDD.scala:48 []\n",
      " |   PythonRDD[2] at RDD at PythonRDD.scala:48 []\n",
      " |   /weather/weather.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |   /weather/weather.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "print(DATA.toDebugString().decode())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('PRCP records:', 2521007)\n",
      "CPU times: user 16 ms, sys: 12 ms, total: 28 ms\n",
      "Wall time: 2min 2s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "PRCP=DATA.filter(lambda row:row[1]=='PRCP')\n",
    "print('PRCP records:',PRCP.count())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('bad records:', 1)\n",
      "CPU times: user 4 ms, sys: 8 ms, total: 12 ms\n",
      "Wall time: 2.3 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "print('bad records:',ERRORS.count())\n",
    "#all lines: 9358395\n",
    "# only the first line (the header) is bad.\n",
    "# Good lines: 9358394"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[u'ASN00054128',\n",
       "  u'DAPR',\n",
       "  1969,\n",
       "  bytearray(b'\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00@\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~\\x00~')]]"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "DATA.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transform RDD into a Spark DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.sql.context.SQLContext at 0x7fd7e1266650>"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os\n",
    "import sys\n",
    "\n",
    "from pyspark import SparkContext\n",
    "from pyspark.sql import SQLContext\n",
    "from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, BinaryType, FloatType\n",
    "\n",
    "# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext\n",
    "sqlContext = SQLContext(sc)\n",
    "sqlContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StructType(List(StructField(Station,StringType,true),StructField(Measurement,StringType,true),StructField(Year,IntegerType,true),StructField(Values,BinaryType,true)))"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "### Defining the Schema explicitly\n",
    "# The advantage of creating a DataFrame using a pre-defined schema allows the content of the RDD to be simple tuples, rather than rows.\n",
    "\n",
    "# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly\n",
    "# Schema with two fields - person_name and person_age\n",
    "schema = StructType([StructField(\"Station\",     StringType(), True),\n",
    "                     StructField(\"Measurement\", StringType(), True),\n",
    "                     StructField(\"Year\",        IntegerType(),True),\n",
    "                     StructField(\"Values\",      BinaryType(),True)\n",
    "                    ])\n",
    "schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Station: string (nullable = true)\n",
      " |-- Measurement: string (nullable = true)\n",
      " |-- Year: integer (nullable = true)\n",
      " |-- Values: binary (nullable = true)\n",
      "\n",
      "CPU times: user 460 ms, sys: 0 ns, total: 460 ms\n",
      "Wall time: 1.29 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# Create a DataFrame by applying the schema to the RDD and print the schema\n",
    "ALL_DataFrame = sqlContext.createDataFrame(DATA, schema)\n",
    "ALL_DataFrame.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Write out data frame into Parquet directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "18/03/01 21:55:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.\n",
      "Deleted /weather/weather.parquet\n",
      "CPU times: user 52 ms, sys: 76 ms, total: 128 ms\n",
      "Wall time: 2.2 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "!hadoop fs -rm -r /weather/weather.parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 4 ms, sys: 4 ms, total: 8 ms\n",
      "Wall time: 17.1 s\n"
     ]
    }
   ],
   "source": [
    "%%time \n",
    "outfilename=\"hdfs:///weather/weather.parquet\"\n",
    "ALL_DataFrame.write.save(outfilename)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "7668890105  /weather/weather.csv\r\n",
      "2302800781  /weather/weather.parquet\r\n"
     ]
    }
   ],
   "source": [
    "!hadoop fs -du /weather/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Copy parquet directory to head node and then to S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/mnt/workspace/Data\n",
      "total 0\n",
      "drwxrwxr-x 2 hadoop hadoop 199 Mar  1 20:07 stations.parquet\n"
     ]
    }
   ],
   "source": [
    "%cd /mnt/workspace/Data/\n",
    "!rm -rf weather.parquet/\n",
    "!ls -lrt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 84 ms, sys: 140 ms, total: 224 ms\n",
      "Wall time: 6.46 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "!hadoop fs -copyToLocal /weather/weather.parquet weather.parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1612\t./stations.parquet\r\n",
      "2248960\t./weather.parquet\r\n",
      "2250572\t.\r\n"
     ]
    }
   ],
   "source": [
    "!du ."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 16 ms, sys: 80 ms, total: 96 ms\n",
      "Wall time: 1.14 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#rm parquet directory from s3\n",
    "!aws s3 rm --recursive --quiet s3://dse-weather/weather.parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 344 ms, sys: 208 ms, total: 552 ms\n",
      "Wall time: 18.1 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# Copy parquet directory from headnode to s3\n",
    "!aws s3 cp --recursive --quiet ./weather.parquet s3://dse-weather/weather.parquet"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## Loading and using a parquet file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "stations.parquet  weather.parquet\r\n"
     ]
    }
   ],
   "source": [
    "!ls"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "!rm -rf weather.parquet/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 328 ms, sys: 224 ms, total: 552 ms\n",
      "Wall time: 17.1 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "!aws s3 cp --recursive --quiet s3://dse-weather/weather.parquet ./weather.parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 176 ms, sys: 144 ms, total: 320 ms\n",
      "Wall time: 9.53 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "!hadoop fs -copyFromLocal  weather.parquet /weather/weather.parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SELECT station,measurement,year \n",
      "FROM parquet.`/weather/weather.parquet` \n",
      "WHERE measurement=\"PRCP\" \n",
      "number of rows= 2521007\n",
      "+-----------+-----------+----+\n",
      "|    station|measurement|year|\n",
      "+-----------+-----------+----+\n",
      "|IN011020501|       PRCP|1939|\n",
      "|IN011020501|       PRCP|1940|\n",
      "|IN011020501|       PRCP|1941|\n",
      "|IN011020501|       PRCP|1942|\n",
      "|IN011020501|       PRCP|1943|\n",
      "+-----------+-----------+----+\n",
      "only showing top 5 rows\n",
      "\n",
      "CPU times: user 12 ms, sys: 0 ns, total: 12 ms\n",
      "Wall time: 2.59 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "parquet_name='/weather/weather'\n",
    "query=\"\"\"SELECT station,measurement,year \n",
    "FROM parquet.`%s.parquet` \n",
    "WHERE measurement=\\\"PRCP\\\" \"\"\"%parquet_name\n",
    "print(query)\n",
    "df2 = sqlContext.sql(query)\n",
    "print 'number of rows=',df2.count()\n",
    "df2.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "celltoolbar": "Slideshow",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
